package consumer

import (
	"github.com/Shopify/sarama"
	log "github.com/alecthomas/log4go"
	"github.com/cnfinder/sarama-cluster"
	"os"
	"os/signal"
)

type FinderKafkaConsumer struct {
	Consumer *cluster.Consumer
	brokers []string
	groupId string
	signals chan os.Signal
	Config *cluster.Config
}

var kafkaConfig *cluster.Config

func init()  {
	kafkaConfig = cluster.NewConfig()
	kafkaConfig.Consumer.Return.Errors = true
	kafkaConfig.Group.Return.Notifications = true
	kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
	kafkaConfig.Consumer.Offsets.AutoCommit.Enable = false // 手动提交
}

func  GetConfig() *cluster.Config {
	return kafkaConfig
}

func NewFinderKafkaConsumer(brokers []string,groupId string,topics []string) (*FinderKafkaConsumer,error) {
	kafka:=&FinderKafkaConsumer{brokers:brokers}
	kafka.groupId=groupId




	// init consumer
	consumer, err := cluster.NewConsumer(brokers, groupId, topics, kafkaConfig)
	if err != nil {

		return kafka,err
	}
	kafka.Consumer=consumer



	return kafka,nil

}


//启动消费
func (this *FinderKafkaConsumer) Consume(errorFunc func(error),notificationsFunc func(*cluster.Notification),onMessage func(*sarama.ConsumerMessage)) {

	// trap SIGINT to trigger a shutdown
	this.signals = make(chan os.Signal, 1)
	signal.Notify(this.signals, os.Interrupt)

	if errorFunc!=nil{
		// consume errors
		go func() {
			for err := range this.Consumer.Errors() {
				errorFunc(err)
			}

		}()
	}

	if notificationsFunc!=nil{
		// consume notifications
		go func() {

			for note := range this.Consumer.Notifications() {
				notificationsFunc(note)
			}
		}()
	}


	// consume messages, watch signals
	var successes int
Loop:
	for {
		select {
		case msg, ok := <-this.Consumer.Messages():
			if ok {
				log.Debug("%s:%s/%d/%d\t%s\t%s\n", this.groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				onMessage(msg)

				this.Consumer.MarkOffset(msg, "")  // mark message as processed
				successes++
			}
		case <-this.signals:
			break Loop
		}
	}
}